Bullsharkの理想的な処理の説明: commit_one() in bullshark.rs
目的
Bullsharkコンセンサスプロトコルが理想的な形で4ラウンド実行されることをシミュレートしています。特に、第2ラウンドのリーダーがコミットされるかどうかを確認することを目的としています。
理解しないけといけない構造体
Bullshark::new()
LeaderSchedule::new()
commit_one()に記載されていないこと
実際の証明書ごとの順序付け,以下のどこかに記載されているはず
Bullshark::process_certificate()は何か?
リーダー選出
orderingの責任を持つあるラウンドでのリーダー選出プロセスを示している気がする
並び替えはやっていない
order_certificatesみたいな関数はないんか?
ConsensusState.dagをアップデートするものは?
ConsensusState::construct_dag_from_cert_store()は,ラウンドの数だけ証明書の数を特定し,restoreして保持しているだけ
ConsensusState::update()は?
新しい証明書がコミットされた後に古くなった証明書を削除してるだけ
Consensus::run_inner()かなり関係ありそう
perplexityでも確認した
直下に詳細記載
tx_sequenceチャネル
Consensus構造体内のフィールド内のtx_sequence
metered_channel::Sender<CommittedSubDag>で型指定
疑問点・仮説
subdag -> commited sub dagになるはず
CommittedSubDag構造体でrx_sequenceとかある??
rx_sequence変数が使われている該当プログラム
3つともそれっぽいのでリンクだけ羅列する
Subscriberはすでにコンセンサスによって並び替えられた(sequenced)証明書を受信
コンセンサス実行後にその結果を他のノードに通知するとか,マトリックスをまとめるとかそんな使い道な気がする
ハズレ
コミットされた証明書を取り扱うからexecutor/内にあるんやね
Executor::spawn()
Subscriberの生成と実行
rx_sequenceからコミットされたサブDAGを受信して処理する無限ループを含んでいることを考えると受け取った証明書はすでに並び替えられている?
test文あるやんけ,実際の並び替えはここか,Bullshark構造体や該当ファイルで呼び出されているか?検索せよ
test_recovery()
25-32. コンセンサスエンジン用のチャネルを設定します
35-45. Bullsharkコンセンサスインスタンスを作成し、コンセンサスを開始します。
46-47. プライマリチャネルの受信を処理するタスクを生成します。
48-54. 生成した証明書をコンセンサスに送信します。
55-69. コミットされた証明書の順序と数を確認します。
test_internal_consensus_output()
6-11. トランザクションクライアントを作成し、トランザクション確認チャネルを購読します。
サブスクライバーを作ってから順序付けをやっている?
let mut client = authority.new_transactions_client(&worker_id).await;
何をやっているのか?単にワーカーノードにtxを送信しているのか?
12-26. 10個の任意のトランザクションを作成し、クライアントを通じて送信します。
code:.rs
const NUM_OF_TRANSACTIONS: u32 = 10;
for i in 0..NUM_OF_TRANSACTIONS {
let tx = string_transaction(i);
// serialise and send
let tr = bcs::to_bytes(&tx).unwrap();
let txn = TransactionProto {
};
client.submit_transaction(txn).await.unwrap();
transactions.push(tx);
27-49. トランザクションの完了を待ち、以下を確認します:
- 全てのトランザクションが受信されること
- トランザクションが送信された順序と同じ順序で処理されること
結局タイムスタンプを確認して並び替えているだけか
そして,Narwhalの時点でバリデーションは終わってコミット可能な証明書になっているから,あとは各リーダーがその証明書をまとめてタイムスタンプに従ってコミットを他のバリデータに提出しているだけか
全文
code:.rs
async fn commit_one() {
let fixture = CommitteeFixture::builder().build(); // 新しいBuilder構造体を作る.Builder構造体は4つのcomittee_sizeと4つのワーカーを持つ
let committee = fixture.committee(); // committeeをcloneしてるだけ
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect(); // authoritiesのidを取得(この数はAuthorityFixtureの数に依存 = 4)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>(); // 指定されたcomitteeの証明書を生成(in primary.rs)
// 証明書の数は (4 cert in 1 round) * 2
let (mut certificates, next_parents) = test_utils::make_optimal_certificates(
&committee, // committee
&latest_protocol_version(), // protocol_config
1..=2, // range 1 ~ 2, rounds_of_certificates()でroundごとになっている,つまり2ラウンド
&genesis, // initial_parents
&ids, // ids 4つ
);
// ラウンドr, r = 3
// 2つの権限ノードがラウンドrで未検証の証明書を生成
// 未検証なのはテストケースのためそうしているだけかもしれない
// 実際は証明書のダイジェストを生成
// Make two certificate (f+1) with round 3 to trigger the commits.
let (_, certificate) = test_utils::mock_certificate(
&committee,
&latest_protocol_version(),
3,
next_parents.clone(),
);
certificates.push_back(certificate);
let (_, certificate) = test_utils::mock_certificate(
&committee,
&latest_protocol_version(),
3,
next_parents,
);
certificates.push_back(certificate);
// Spawn the consensus engine and sink the primary channel.
let (tx_new_certificates, rx_new_certificates) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);
let (tx_output, mut rx_output) = test_utils::test_channel!(1);
let (tx_consensus_round_updates, _rx_consensus_round_updates) =
watch::channel(ConsensusRound::new(0, 0));
let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS);
let store = make_consensus_store(&test_utils::temp_dir());
let cert_store = make_certificate_store(&test_utils::temp_dir());
// ガベージコレクションのラウンドの深さは50ラウンド,多分デフォルトと同じ?
let gc_depth = 50;
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
// Bullsharkコンセンサスのインスタンス作る
let bullshark = Bullshark::new(
committee.clone(),
store.clone(), // Arc<ConsensusStore>, 永続ストレージ,クラッシュ回復をサポート
latest_protocol_version(),
metrics.clone(),
NUM_SUB_DAGS_PER_SCHEDULE, // 最後に成功したリーダー選出のタイムスタンプ
LeaderSchedule::new(committee.clone(), // スケジュール変更と評判スコアリセットをトリガーするサブDAGの数。
LeaderSwapTable::default()), // ラウンドのリーダーを見つけるために使用されるスケジュール。
);
// ここでコンセンサスプロトコルを実行
// consensus state以外はimmutable
// 最新のサブDAGのコミットされた最後のラウンドとリーダーラウンドが一致しているか確認
// ConsensusState::new_from_store()でストアからのデータで初期化??
// spawn_logged_monitored_task!(s.run(), "Consensus", INFO)で次の関数へ?
let _consensus_handle = Consensus::spawn(
committee,
gc_depth,
store,
cert_store,
tx_shutdown.subscribe(),
rx_new_certificates, // 証明書の全履歴がないときプライマリから証明書を受け取る
tx_primary, // output: 順序付けされた証明書のシーケンスをプライマリに送信します(クリーンアップとフィードバック用)。
tx_consensus_round_updates, // output: コンセンサスにおける最高コミットされたラウンドに対応するgc_round
tx_output, // output: アプリケーション層へ順序付けされた証明書のシーケンス(多分アプリケーションというのはNarwhalのこと?)
bullshark, // Bullshark構造体,実行するコンセンサスプロトコルの指定
metrics,
);
// 非同期タスクとして,rx_primaryというチャネルでis_some()がfalseになるまで受信し続ける
tokio::spawn(async move { while rx_primary.recv().await.is_some() {} });
// 全ての証明書を非同期でチャネルに渡す
// 証明書の集合に対してpop_front()で基本的に古いもの?から順番に取り出し,キューから削除する,ラウンドごとになっているはず
while let Some(certificate) = certificates.pop_front() {
tx_new_certificates.send(certificate).await.unwrap();
}
// チャネルに送信されたメッセージをrecv()で取得し,CommittedSubDag構造体を格納
let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap();
// コミットされた証明書のシーケンスを表しているcertificates(Vec<Certificate>)の所有権イテレータに移動して変更可能なsequence変数に格納する
let mut sequence = committed_sub_dag.certificates.into_iter();
// 最初の証明書4つがラウンド1で行われたものであることを確認する
for _ in 1..=4 {
let output = sequence.next().unwrap();
assert_eq!(output.round(), 1);
}
let output = sequence.next().unwrap();
assert_eq!(output.round(), 2);
// オーソリティの総数が4であることを確認
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 4);
// オーソリティの評判スコアを0に更新
assert!(committed_sub_dag.reputation_score.all_zero());
}
--- 以下を削除すべきかを検討する
ラウンド2までは証明書作成して,ラウンド3以降の親証明書の準備をする
ラウンド1で4つの証明書が実行か作られたかをあとで確認しているんだけど,どこでそれをやっているのか?
code:.rs
let (mut certificates, next_parents) = test_utils::make_optimal_certificates(
&committee, // committee
&latest_protocol_version(), // protocol_config
1..=2, // range
&ids, // ids <- 権限ノード(AuthorityFixture)の数だけ
);
make_optimal_certificates()では未署名の証明書を1つ作成して,あとは次のラウンドで必要な証明書のダイジェストとかだけ
certificateややこしいので
code:.rs
// ただmake_certificates()に渡してるだけ
pub fn make_optimal_certificates(
committee: &Committee,
protocol_config: &ProtocolConfig,
range: RangeInclusive<Round>,
initial_parents: &BTreeSet<CertificateDigest>,
) -> (VecDeque<Certificate>, BTreeSet<CertificateDigest>) {
make_certificates(committee, protocol_config, range, initial_parents, ids, 0.0)
}
// サンプル数の親を持つ署名のない証明書を「ラウンド」分作成
pub fn make_certificates(
committee: &Committee,
protocol_config: &ProtocolConfig,
range: RangeInclusive<Round>,
initial_parents: &BTreeSet<CertificateDigest>,
failure_probability: f64,
) -> (VecDeque<Certificate>, BTreeSet<CertificateDigest>) {
let generator =
|pk, round, parents| mock_certificate(committee, protocol_config, pk, round, parents);
rounds_of_certificates(range, initial_parents, ids, failure_probability, generator)
}
// 指定されたラウンド、オリジン、親から、署名の不適切な証明書を生成します。
// 注:証明書は作成者ではなくランダムなキーによって署名されます。
pub fn mock_certificate(
committee: &Committee,
protocol_config: &ProtocolConfig,
origin: AuthorityIdentifier,
round: Round,
parents: BTreeSet<CertificateDigest>,
) -> (CertificateDigest, Certificate) {
mock_certificate_with_epoch(committee, protocol_config, origin, round, 0, parents)
}
//
fn rounds_of_certificates(
range: RangeInclusive<Round>,
initial_parents: &BTreeSet<CertificateDigest>,
failure_probability: f64,
make_one_certificate: impl Fn(
AuthorityIdentifier,
Round,
BTreeSet<CertificateDigest>,
) -> (CertificateDigest, Certificate),
) -> (VecDeque<Certificate>, BTreeSet<CertificateDigest>) {
let mut certificates = VecDeque::new();
let mut parents = initial_parents.iter().cloned().collect::<BTreeSet<_>>();
let mut next_parents = BTreeSet::new();
// range内かつidの数だけcertificateを生成
for round in range {
next_parents.clear();
for id in ids {
let this_cert_parents = this_cert_parents(&parents, failure_probability);
let (digest, certificate) = make_one_certificate(*id, round, this_cert_parents);
certificates.push_back(certificate);
next_parents.insert(digest);
}
parents = next_parents.clone();
}
(certificates, next_parents)
}
let (mut certificates, next_parents) = test_utils::make_optimal_certificates関数
では、certificatesと複数形であり、バリデータ数をどう計算しているかはわからないが、ラウンドrで複数の証明書が生成されているはず
committeeの数だけ証明書作ってそう
let committee = fixture.committee();
が何をやっているのか?
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
certificate生成時の引数に注目すること
ラウンド3で証明書を作成とコミットのトリガーとする
code:.rs
let (_, certificate) = test_utils::mock_certificate(
&committee,
&latest_protocol_version(),
3, // round
next_parents.clone(), // round2までの証明書をコピー
);
certificates.push_back(certificate); // 追加
test_utils::mock_certificate()について
Bullshark構造体を新しく作って,コンセンサスエンジンを起動する
code:.rs
let bullshark = Bullshark::new(
committee.clone(),
store.clone(),
latest_protocol_version(),
metrics.clone(),
NUM_SUB_DAGS_PER_SCHEDULE,
LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()),
);
let _consensus_handle = Consensus::spawn(
committee,
gc_depth,
store,
cert_store,
tx_shutdown.subscribe(),
rx_new_certificates,
tx_primary,
tx_consensus_round_updates,
tx_output,
bullshark,
metrics,
);
全ての証明書のコンセンサスへ送信
最後の証明書がトリガーになる(DAGだからね)
code:.rs
while let Some(certificate) = certificates.pop_front() {
tx_new_certificates.send(certificate).await.unwrap();
}
コミットされた証明書の確認
最初の4つの証明書がラウンド1であること
リーダーの証明書はラウンド2にコミットされているか
code:.rs
let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap();
let mut sequence = committed_sub_dag.certificates.into_iter();
for _ in 1..=4 {
let output = sequence.next().unwrap();
assert_eq!(output.round(), 1);
}
let output = sequence.next().unwrap();
assert_eq!(output.round(), 2);
reputation scoreはバリデータがルール通り正しく実行したかを評価し,DPoS内の分配の重みづけに使われるものであり,コンセンサス実行ではなく,実行のためのインセンティブのために利用されるにすぎないためここでは議論に含めない.